Skip to content

feat: @step(start=True)/@step(end=True) annotations, single-step flows, and node_info metadata#3120

Merged
talsperre merged 16 commits intomasterfrom
romain/worktree/mutate_graph
Apr 28, 2026
Merged

feat: @step(start=True)/@step(end=True) annotations, single-step flows, and node_info metadata#3120
talsperre merged 16 commits intomasterfrom
romain/worktree/mutate_graph

Conversation

@romain-intel
Copy link
Copy Markdown
Contributor

@romain-intel romain-intel commented Apr 16, 2026

PR Type

  • Bug fix
  • New feature
  • Core Runtime change (higher bar -- see CONTRIBUTING.md)
  • Docs / tooling
  • Refactoring

Summary

Makes the FlowSpec graph more flexible in three ways, with backward compatibility preserved throughout:

  1. Explicit start/end annotations. @step now accepts start=True and end=True kwargs. Steps no longer have to be named "start"/"end" — a flow can use any name as long as one step is annotated as the start and one as the end. When no annotations are present, the graph falls back to discovering steps by the names "start" and "end" (old behavior).

  2. Single-step flows. A single step annotated @step(start=True, end=True) is now valid; previously a FlowSpec required at least two steps.

  3. Per-node node_info metadata. DAGNode gains a node_info dict that extensions can populate to attach arbitrary per-step metadata. Live references are accessible via flow._graph; serialized values (via to_pod) flow into _graph_info. Callables are serialized as their qualified name.

These are core-runtime changes because they touch graph.py, lint.py, runtime.py, task.py, and every orchestrator plugin (Argo Workflows, Step Functions, Airflow) that reads the graph or emits events tied to the terminal step.

Key changes

Area What changed
decorators.py @step(start=False, end=False, node_info=None)
graph.py DAGNode.is_start_step, is_end_step, node_info; _identify_start_end uses annotations with name fallback; attribute name (not AST def name) is authoritative; tolerates sourceless single-step methods (for out-of-tree FunctionSpec extension)
lint.py check_basic_steps distinguishes "no start" from "multiple start annotations"; new check_start_end_degree validates in/out degrees; new check_annotation_name_conflict warns when @step(start=True) coexists with a step named "start" (same for end)
util.py to_pod serializes callables via __qualname__
runtime.py Persists start_step/end_step as metadata on the _parameters task
task.py, flowspec.py Use the graph's start/end step rather than literal "start"/"end" names
client/core.py Run._graph_endpoints reads endpoints from _parameters metadata with _graph_info fallback for orchestrated runs
plugins/argo/argo_workflows.py _matching_conditional_join falls back to graph.end_step; terminal-step detection through the DAG
plugins/argo/argo_workflows_decorator.py Auto-emitted event from the end step is also published with the well-known .end suffix so @trigger_on_finish subscribers — which don't know the publisher's end step name at deploy time — keep receiving triggers from custom-named end steps
plugins/aws/step_functions/step_functions.py Reads StartAt from the definition JSON instead of hardcoded ["States"]["start"]
plugins/airflow/airflow.py, airflow_cli.py Use graph.start_step/end_step instead of hardcoded names
plugins/cards/card_cli.py graph kwarg keeps the old dict-of-steps shape for backward compatibility with third-party card modules; new opt-in graph_info kwarg carries the full payload (steps + start/end). Card_cli probes the constructor signature and only passes graph_info to cards that accept it
plugins/cards/card_modules/basic.py DefaultCard, DefaultCardJSON, ErrorCard accept both graph and graph_info (preferred when provided)
plugins/cards/ui/ Svelte DAG renderer reads start/end dynamically instead of assuming "start"/"end"

Tests

  • Unit tests added/updated
  • Reproduction script provided (required for Core Runtime)
  • CI passes
  • If tests are impractical: explain why below and provide manual evidence above

New and updated tests cover:

  • Graph structure inference: standard flows, custom-named flows, single-step flows, branch flows, foreach flows, split-as-entry-step flows
  • @step(start=True)/@step(end=True) annotation mechanics and backward-compat with name-based discovery
  • Lint validation: degree checks, annotation-name conflicts, negative-path cases for malformed annotations
  • End-to-end execution of custom-named flows via Runner with card rendering
  • Trigger.from_runs on flows with a custom-named terminal step
  • Step Functions StartAt lookup from deployment JSON
  • Card graph transform accepting both legacy and new payload shapes

Non-Goals

  • A StepSpec/FunctionSpec construct (single-step FlowSpec-like class with init()+call() lifecycle) — hooks for it are present in DAGNode and _base_step_decorator (see module comment in graph.py), but the class itself is kept out-of-tree for now.
  • Multi-flow file support (a FlowSpec.main() classmethod to route between multiple FlowSpec subclasses in one file) — deferred.
  • Direct non-CLI invocation of FlowSpec — deferred.

AI Tool Usage

  • AI tools were used (describe below)

AI coding assistance was used during development. All generated code was reviewed and tested.

Copy link
Copy Markdown
Contributor Author

@romain-intel romain-intel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments to update PR.

Comment thread metaflow/graph.py
Comment thread metaflow/parameters.py Outdated
Comment thread metaflow/stepspec.py Outdated
Comment thread test/unit/graph_inference/flows/custom_branch_flow.py
Comment thread test/unit/graph_inference/test_stepspec.py Outdated
@romain-intel romain-intel force-pushed the romain/worktree/mutate_graph branch from 2d2b74a to 853620c Compare April 20, 2026 07:19
@romain-intel romain-intel changed the title Romain/worktree/mutate graph feat: flexible flow graph with start/end annotations, multi-flow files, and node metadata Apr 20, 2026
@talsperre talsperre marked this pull request as ready for review April 20, 2026 23:04
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 20, 2026

Greptile Summary

This PR makes the FlowSpec graph more flexible by introducing explicit @step(start=True)/@step(end=True) annotations, single-step flows, and per-node node_info metadata — while preserving full backward compatibility with name-based "start"/"end" discovery. The implementation is thorough: graph.py, lint.py, runtime.py, all three orchestrator plugins, the card system, and the client API are all updated consistently. The test coverage (unit, graph-inference, UX) is solid. Remaining findings are all P2 style/quality issues with no blocking correctness problems beyond what has been flagged in previous review threads.

Confidence Score: 4/5

Safe to merge pending resolution of previously-flagged P1 threads; no new blocking issues found.

All new findings in this pass are P2 style/quality issues. Previously-flagged P1 issues (orchestrator _graph_endpoints fallback, step_functions hardcoded keys, Argo end-event alias) are addressed in the PR diff. Score sits at the P1-ceiling of 4 reflecting caution from those prior threads.

metaflow/lint.py (double-warning overlap); metaflow/util.py (overly broad callable check in to_pod); metaflow/client/core.py (_graph_endpoints fallback for orchestrated runs, already flagged in prior thread)

Important Files Changed

Filename Overview
metaflow/graph.py Core graph changes: DAGNode gains is_start_step/is_end_step/node_info; new _identify_start_end resolves start/end via annotations with name fallback; sourceless single-step node support for FunctionSpec; output_steps handles single-step and custom-named flows. Logic is sound with minor doc gap around the single-step implicit override.
metaflow/lint.py New checks check_start_end_degree and check_annotation_name_conflict added; existing checks updated for custom step names. Double-warning overlap between check_start_end_degree and check_that_end_is_end for end step with self.next() is a minor UX issue.
metaflow/decorators.py @step now accepts start/end/node_info kwargs via the f=None, *, ... pattern; FunctionSpec class-level decorator forwarding hook added. The f=my_func keyword footgun is minor; otherwise backward-compatible.
metaflow/client/core.py New _graph_endpoints property reads start/end step names from _parameters task metadata with ("start","end") fallback; end_task, trigger, and Step.parent/children navigation updated. For orchestrated runs without NativeRuntime metadata the fallback remains "start"/"end".
metaflow/events.py Trigger.from_runs rewritten to use end_task.parent.id and skip runs without an end task; dual-emits .end alias for custom-named end steps, mirroring argo_workflows_decorator.py.
metaflow/plugins/argo/argo_workflows.py All hardcoded "start"/"end" string references replaced with graph.start_step/end_step; _matching_conditional_join fallback fixed to use graph.end_step. Changes are correct and well-targeted.
metaflow/plugins/argo/argo_workflows_decorator.py task_post_step now dual-emits Argo events for custom-named end steps (actual name + ".end" alias) so @trigger_on_finish subscribers keep working. Logic is clean and idempotency is handled via stable pathspec id.
metaflow/plugins/aws/step_functions/step_functions.py get_existing_deployment and get_execution now read StartAt from the definition JSON to locate the start state rather than hardcoding "start"; defensive None-checking replaces chained .get() calls. Correct fix for custom start-step names.
metaflow/plugins/cards/card_cli.py graph dict (legacy shape) passed alongside new graph_info kwarg detected via inspect.signature; backward-compat preserved. Nested try/except for signature probe is somewhat defensive but harmless.
metaflow/runtime.py Hardcoded "start"/"end" step names replaced with graph.start_step/end_step throughout; new MetaDatum registration for start_step/end_step on _parameters task with graceful exception handling.
metaflow/util.py to_pod gains callable serialization via qualname; overly broad callable() check also matches class objects, which may produce unexpected serialization for non-function callables.

Reviews (22): Last reviewed commit: "test: slim _graph_endpoints unit tests, ..." | Re-trigger Greptile

Comment thread metaflow/flowspec.py Outdated
Comment thread metaflow/flowspec.py Outdated
Comment thread metaflow/graph.py
@talsperre talsperre force-pushed the romain/worktree/mutate_graph branch 8 times, most recently from 583fc92 to 2902c34 Compare April 21, 2026 06:49
Comment thread metaflow/client/core.py
Copy link
Copy Markdown
Contributor Author

@romain-intel romain-intel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor comments with a quick review.

Comment thread metaflow/plugins/aws/step_functions/step_functions.py
Comment thread metaflow/runtime.py
@talsperre talsperre force-pushed the romain/worktree/mutate_graph branch 6 times, most recently from 4ff334e to 23581b3 Compare April 22, 2026 05:38
Copy link
Copy Markdown
Contributor Author

@romain-intel romain-intel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nits. The workaround for no code will probably see evolution but it's not user facing (much). I'd just clarify a bit why it's there and we can merge.

Comment thread metaflow/graph.py
Comment thread metaflow/decorators.py
Comment thread metaflow/graph.py
Comment thread metaflow/graph.py
Comment thread metaflow/runtime.py
@romain-intel romain-intel changed the title feat: flexible flow graph with start/end annotations, multi-flow files, and node metadata feat: @step(start=True)/@step(end=True) annotations, single-step flows, and node_info metadata Apr 23, 2026
Comment thread metaflow/graph.py
Comment thread metaflow/plugins/cards/ui/src/components/dag/dag.svelte
Comment thread metaflow/graph.py
@talsperre talsperre force-pushed the romain/worktree/mutate_graph branch from 8abb8fc to accd71d Compare April 27, 2026 01:40
romain-intel and others added 2 commits April 27, 2026 23:36
Instead of enforcing a step named "start" and another named "end"
and a minimum of two steps, Metaflow now allows for single
step flows with any names. The "start" and "end" properties are
derived from the structure of the graph. We still require a single
entry point and a single exit point but they can be one and the same
and do not have to be named something specific.
Core fixes:
- events.py: use end_task.parent.id instead of hardcoded run_obj["end"]
- step_functions.py: use StartAt from definition JSON (both get_existing_deployment
  and get_execution) instead of hardcoded ["States"]["start"]
- argo_workflows.py: _matching_conditional_join uses self.graph.end_step
  instead of hardcoded "end" fallback
- graph.py: None guard in output_steps() raises clear ValueError
- client/core.py: narrow _graph_endpoints exception caching to
  (KeyError, MetaflowNotFound); transient errors return uncached fallback
- runtime.py: guard metadata registration with is_cloned check + try/except

Cards:
- card_cli.py: wrap graph_dict in payload with start_step/end_step metadata
- basic.py: accept new payload format, pass through start/end
- dag.svelte, step-wrapper.svelte, types.ts: dynamic start/end props

Lint:
- Improved check_basic_steps to distinguish "no start step" from "multiple
  @step(start=True) annotations"
- New check_annotation_name_conflict: warns when @step(start=True) coexists
  with a step named "start" (and likewise for end)

Tests:
- Negative-path tests for malformed annotation patterns (8 cases)
- Card rendering with custom endpoints
- Trigger.from_runs() with custom terminal step
- Step Functions StartAt lookup

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
talsperre and others added 14 commits April 27, 2026 23:36
FlowGraph._create_nodes walks dir(cls) by attribute name and stores
nodes[element] = node, but DAGNode.name was derived from the AST def
name. Those agreed only by convention. When a metaclass renames a
function via __name__ (e.g. FunctionSpec synthesizing a step), AST
parsing still reads the original def name, leaving nodes keyed by the
attribute name while node.name reports the def name. FlowSpec._init_graph
then does getattr(cls, node.name) and fails with AttributeError.

Pass the discovered attribute name explicitly to DAGNode in the AST
path, and have DAGNode prefer that over func_ast.name. Both paths
(AST and the sourceless single-step fallback) now key on the same
identifier. No behavior change for flows where def name and attribute
name coincide, which is every ordinary flow.
Two fixes addressing PR review feedback on custom-named start/end steps:

- argo_workflows_decorator.py: when the step emitting the auto-event is
  the graph's end step but has a custom name, also publish the event with
  the well-known ".end" suffix. @trigger_on_finish subscribers construct
  event names as "metaflow.<project>.<flow>.end" at deploy time and don't
  know the publisher's end step name, so without this dual emission,
  downstream flows silently miss triggers from custom-named end steps.

- card_cli.py: revert the `graph` kwarg to the old shape (dict of
  step_name -> info) to avoid breaking third-party card modules that
  expect the pre-annotation format. Cards that need start/end metadata
  can opt-in by accepting a new `graph_info` kwarg — card_cli probes the
  constructor signature and only passes it when the card declares it.

- basic.py: DefaultCard, DefaultCardJSON, and ErrorCard now accept
  `graph_info` (preferred) in addition to `graph` (backward compat).
The `func_ast=None`/`name`/`num_args` fallback in DAGNode, the
`_create_sourceless_single_step_node` method, and the
`_function_spec_step_name` branch in `_base_step_decorator` all exist to
support an upcoming FunctionSpec feature (currently shipped as an
out-of-tree extension). Without that context the code looks arbitrary —
`_function_spec_step_name` is never set anywhere in the tree, and
tolerating a missing AST seems like a design smell.

Add a module-level explainer on DAGNode that lays out the FunctionSpec
contract in one place, plus targeted inline pointers at each integration
point (DAGNode.__init__, num_args init, _create_sourceless_single_step_node,
_create_nodes except handler, and _base_step_decorator) that reference
back to it. Also flag these hooks as potentially temporary — they may be
removed if FunctionSpec is folded into core.

No behavior change.
- events.py: Trigger.from_runs now emits both the step-name-specific
  event ("metaflow.<flow>.<end_step>") and the well-known ".end" alias
  for custom-named end steps. This mirrors the dual-emit in
  argo_workflows_decorator.py so the Argo publish path and the
  programmatic from_runs path stay symmetric -- downstream filters on
  ".end" keep matching from either source.

- argo_workflows_decorator.py: fix stale idempotency comment that was
  stranded inside the new dual-emit loop. Clarify that retries
  re-publish the same event(s) and that dedupe (if needed) happens on
  the Argo Events side via the stable "id" payload.

- step_functions.py: replace the chained .get().get().get() in
  get_execution with explicit guards that produce a readable error
  ("State machine X has no state named Y") rather than an opaque
  AttributeError when a non-Metaflow state machine has an unexpected
  shape.

- client/core.py: clarify the Run._graph_endpoints docstring to note
  that every runtime path (native + orchestrators via the init command)
  writes the _parameters metadata, so the single-tier lookup with a
  literal fallback is sufficient.

Tests (13 new, 90 total passing):
- test_to_pod.py: covers primitives, containers, and the callable ->
  __qualname__ serialization used by DAGNode.node_info.
- test_graph_endpoints_fallback.py: covers metadata hit, empty metadata,
  partial metadata, caching, transient-error non-caching, and
  MetaflowNotFound caching.
Replace direct imports of unittest.mock.MagicMock with the pytest-mock
mocker fixture per team convention. The MagicMock-based helper becomes a
make_run pytest fixture that takes mocker, so cleanup is handled per test.
test_transient_error_not_cached uses mocker.MagicMock(side_effect=[...])
instead of a hand-rolled call-count closure.

Add pytest-mock to devtools/requirements-devstack.txt so the fixture is
available in tox -e unit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Cover the scenarios raised in PR review for @step(start=True, end=True)
single-step flows: Config descriptors, stacked step decorators, and
FlowMutators.

Unit tests (test/unit/test_graph_structure.py):
- Config descriptor is registered via _get_parameters.
- @Retry + @resources stack correctly on the only step.
- FlowMutator registers at class-definition time (pre_mutate only fires
  via the CLI layer, so execution is covered by the integration test).

Integration tests (test/unit/graph_inference/, new flow files + fixtures):
- Config-bearing single-step flow runs end-to-end; config value flows
  through to the end task's artifact.
- Stacked @retry/@resources flow runs; _graph_info records both
  decorators on the only step.
- FlowMutator-decorated flow runs; pre_mutate lands @Retry on the sole
  step.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pre-commit black hook collapsed the first new fixture's create_flow_fixture
call onto a single line (fits within line length); the two longer ones stay
multi-line as black reformatted them.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
A flow with exactly one step is unambiguous: that step is both the
entry and the terminal node, with or without an annotation. Make the
ergonomic case work without forcing users to write
@step(start=True, end=True) for the trivial single-step shape.

Implementation: after the existing @step(start=...) / @step(end=...)
resolution, when the graph has exactly one node, point start_step
and end_step at it. Multi-step flows are unaffected, so the explicit
annotation contract still holds for real DAGs.

Drop the not-name.startswith("_") filter from _identify_start_end's
_resolve closure and from lint.check_basic_steps. The filter was
guarding against _parameters bleeding into the annotated-step list,
but _parameters is a runtime-only synthetic step and never appears
in FlowGraph.nodes. lint.check_step_names already rejects user step
names that start with "_", so there is nothing left for the filter
to exclude.

Tests: new SingleStepBareFlow plus six tests mirroring the existing
SingleStepFlow coverage (completes, graph_info, _parameters
metadata, end_task, step iteration, parent/child empty).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The dag.svelte changes in 211618d (Fix start/end step regressions)
added flow-without-start/end rendering support but were never bundled
into main.js. CI does not rebuild the card UI, so the deployed cards
have been rendering against stale bundled code.

Re-run `npm run build` in metaflow/plugins/cards/ui/ to regenerate
the bundle from the current Svelte source.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The defensive isinstance guards on graph and graph["steps"] were
no-ops in practice: the legacy else branch already relies on graph
being dict-like (uses `in graph`), so an isinstance gate that lets
non-dicts fall through to that branch would just defer the error.
The dict check on graph["steps"] guarded a phantom collision (an
old-shape dict with steps named "steps", "start_step", and
"end_step" simultaneously).

Drop both, leaving the three sentinel-key membership tests that
actually distinguish the new and legacy graph shapes. Switch the
two graph.get() calls inside the if-branch to []-indexing since
their keys are confirmed present.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pure test-file rename: where s iterates over Run/Steps and d iterates
over decorator entries, use the descriptive names instead. No
behavior change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The dynamic single-step flow helper and its test in
test_graph_structure.py were testing the FunctionSpec hook contract
(sourceless DAGNode construction via _create_sourceless_single_step_node),
not graph topology. Move them into test_sourceless_dag_node.py with a
docstring that names the contract directly so a future contributor
does not have to follow the chain helper -> graph.py module comment ->
FunctionSpec backstory.

No behavior change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Bundle the three card-related tests into test_card_dag.py:
- test_transform_flow_graph_supports_explicit_endpoints
- test_transform_flow_graph_keeps_legacy_start_end_detection
- test_default_card_includes_custom_graph_endpoints

All three target the cards' graph data layer (transform_flow_graph
and the DAG component render path). Keeping them next to the
graph-execution tests in test_graph_inference.py blurred file
boundaries and meant test_graph_inference.py had to import
DefaultCardJSON, transform_flow_graph, and a four-line importlib
helper just for the one render test.

To support the render test cleanly, hoist the FlowSpec class loader
into conftest.py as _load_flow_class plus a custom_named_card_flow
fixture. DefaultCardJSON.render() does getattr(self.flow, step_name)
and inspect.getsource(...) to build the Task Code panel and crashes
if flow is None, so the test still needs the actual class object;
moving the loader into conftest gets the importlib dance out of the
test files.

No behavior change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The happy-path coverage (custom-named-step flows producing the right
endpoints) lives in test/ux/core/test_basic.py, which runs real flows
via the Runner / scheduler deployer chain across local and scheduler
modes. Three of the unit tests in this file were just re-asserting that
contract without exercising the metadata layer end-to-end.

Drop:
- test_metadata_carries_endpoints (covered by test_custom_step_names)
- test_partial_metadata_fills_in_defaults (edge case, dict.get semantics)
- test_result_cached (cache write is implicitly verified by
  test_metaflow_not_found_caches_fallback)

Replace the make_run factory fixture and the __class__ swap trick with
mocker.patch.object(Run, "__getitem__", ...) on a Run.__new__(Run)
instance. This is the pytest-mock idiom now codified in
CONTRIBUTING.md and matches how other unit tests will look going
forward.

Three tests remain, each pinning a non-obvious behavior:
- empty metadata falls back to ("start", "end")
- MetaflowNotFound (old run) caches the fallback
- transient exceptions do NOT cache (so a retry can succeed)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@talsperre talsperre force-pushed the romain/worktree/mutate_graph branch from 0092a57 to 1e9f658 Compare April 27, 2026 23:37
@talsperre
Copy link
Copy Markdown
Collaborator

Let's go!!!

@talsperre talsperre merged commit 8a12f03 into master Apr 28, 2026
46 checks passed
@talsperre talsperre deleted the romain/worktree/mutate_graph branch April 28, 2026 03:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants